import re

from sqlalchemy import Column
from sqlalchemy import Identity
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import Sequence
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.engine import URL
from sqlalchemy.exc import ArgumentError
from sqlalchemy.schema import CreateTable
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_not_


class _IdentityDDLFixture(testing.AssertsCompiledSQL):
    __sparse_driver_backend__ = True

    @testing.combinations(
        (dict(always=True), "ALWAYS AS IDENTITY"),
        (
            dict(always=False, start=5),
            "BY DEFAULT AS IDENTITY (START WITH 5)",
        ),
        (
            dict(always=True, increment=2),
            "ALWAYS AS IDENTITY (INCREMENT BY 2)",
        ),
        (
            dict(increment=2, start=5),
            "BY DEFAULT AS IDENTITY (INCREMENT BY 2 START WITH 5)",
        ),
        (
            dict(always=True, increment=2, start=0, minvalue=0),
            "ALWAYS AS IDENTITY (INCREMENT BY 2 START WITH 0 MINVALUE 0)",
        ),
        (
            dict(always=False, increment=2, start=1, maxvalue=5),
            "BY DEFAULT AS IDENTITY (INCREMENT BY 2 START WITH 1 MAXVALUE 5)",
        ),
        (
            dict(always=True, increment=2, start=1, nomaxvalue=True),
            "ALWAYS AS IDENTITY (INCREMENT BY 2 START WITH 1 NO MAXVALUE)",
        ),
        (
            dict(always=False, increment=2, start=0, nominvalue=True),
            "BY DEFAULT AS IDENTITY "
            "(INCREMENT BY 2 START WITH 0 NO MINVALUE)",
        ),
        (
            dict(always=True, start=1, maxvalue=10, cycle=True),
            "ALWAYS AS IDENTITY (START WITH 1 MAXVALUE 10 CYCLE)",
        ),
        (
            dict(always=False, cache=1000, cycle=False),
            "BY DEFAULT AS IDENTITY (CACHE 1000 NO CYCLE)",
        ),
        (dict(cycle=True), "BY DEFAULT AS IDENTITY (CYCLE)"),
    )
    def test_create_ddl(self, identity_args, text):
        if getattr(
            self, "__dialect__", None
        ) != "default_enhanced" and testing.against("oracle"):
            text = text.replace("NO MINVALUE", "NOMINVALUE")
            text = text.replace("NO MAXVALUE", "NOMAXVALUE")
            text = text.replace("NO CYCLE", "NOCYCLE")

        t = Table(
            "foo_table",
            MetaData(),
            Column("foo", Integer(), Identity(**identity_args)),
        )
        self.assert_compile(
            CreateTable(t),
            "CREATE TABLE foo_table (foo INTEGER GENERATED %s)" % text,
        )
        t2 = t.to_metadata(MetaData())
        self.assert_compile(
            CreateTable(t2),
            "CREATE TABLE foo_table (foo INTEGER GENERATED %s)" % text,
        )

    def test_other_options(self):
        t = Table(
            "foo_table",
            MetaData(),
            Column(
                "foo",
                Integer(),
                Identity(always=True, start=3),
                nullable=False,
                unique=True,
            ),
        )
        self.assert_compile(
            CreateTable(t),
            "CREATE TABLE foo_table ("
            "foo INTEGER GENERATED ALWAYS AS IDENTITY (START "
            "WITH 3), UNIQUE (foo))",
        )

    def test_autoincrement_true(self):
        t = Table(
            "foo_table",
            MetaData(),
            Column(
                "foo",
                Integer(),
                Identity(always=True, start=3),
                primary_key=True,
                autoincrement=True,
            ),
        )
        self.assert_compile(
            CreateTable(t),
            "CREATE TABLE foo_table ("
            "foo INTEGER GENERATED ALWAYS AS IDENTITY (START WITH 3)"
            ", PRIMARY KEY (foo))",
        )

    def test_nullable_kwarg(self):
        t = Table(
            "t",
            MetaData(),
            Column("a", Integer(), Identity(), nullable=False),
            Column("b", Integer(), Identity(), nullable=True),
            Column("c", Integer(), Identity()),
        )

        is_(t.c.a.nullable, False)
        is_(t.c.b.nullable, True)
        is_(t.c.c.nullable, False)

        nullable = ""
        if getattr(
            self, "__dialect__", None
        ) != "default_enhanced" and testing.against("postgresql"):
            nullable = " NULL"

        self.assert_compile(
            CreateTable(t),
            (
                "CREATE TABLE t ("
                "a INTEGER GENERATED BY DEFAULT AS IDENTITY, "
                "b INTEGER GENERATED BY DEFAULT AS IDENTITY%s, "
                "c INTEGER GENERATED BY DEFAULT AS IDENTITY"
                ")"
            )
            % nullable,
        )


class IdentityDDL(_IdentityDDLFixture, fixtures.TestBase):
    # this uses the connection dialect
    __requires__ = ("identity_columns_standard",)


class DefaultDialectIdentityDDL(_IdentityDDLFixture, fixtures.TestBase):
    # this uses the default dialect
    __dialect__ = "default_enhanced"


class NotSupportingIdentityDDL(testing.AssertsCompiledSQL, fixtures.TestBase):
    def get_dialect(self, dialect):
        dd = URL.create(dialect).get_dialect()()
        if dialect in {"oracle", "postgresql"}:
            dd.supports_identity_columns = False
        return dd

    @testing.combinations("sqlite", "mysql", "mariadb", "postgresql", "oracle")
    def test_identity_is_ignored(self, dialect):
        t = Table(
            "foo_table",
            MetaData(),
            Column("foo", Integer(), Identity(always=True, start=3)),
        )
        t_exp = Table(
            "foo_table",
            MetaData(),
            Column("foo", Integer(), nullable=False),
        )
        dialect = self.get_dialect(dialect)
        exp = CreateTable(t_exp).compile(dialect=dialect).string
        self.assert_compile(
            CreateTable(t), re.sub(r"[\n\t]", "", exp), dialect=dialect
        )

    @testing.combinations(
        "sqlite",
        "mysql",
        "mariadb",
        "postgresql",
        "oracle",
        argnames="dialect",
    )
    @testing.combinations(True, "auto", argnames="autoincrement")
    def test_identity_is_ignored_in_pk(self, dialect, autoincrement):
        t = Table(
            "foo_table",
            MetaData(),
            Column(
                "foo",
                Integer(),
                Identity(always=True, start=3),
                primary_key=True,
                autoincrement=autoincrement,
            ),
        )
        t_exp = Table(
            "foo_table",
            MetaData(),
            Column(
                "foo", Integer(), primary_key=True, autoincrement=autoincrement
            ),
        )
        dialect = self.get_dialect(dialect)
        exp = CreateTable(t_exp).compile(dialect=dialect).string
        self.assert_compile(
            CreateTable(t), re.sub(r"[\n\t]", "", exp), dialect=dialect
        )


class IdentityTest(fixtures.TestBase):
    def test_server_default_onupdate(self):
        text = (
            "A column with an Identity object cannot specify a "
            "server_default or a server_onupdate argument"
        )

        def fn(**kwargs):
            Table(
                "t",
                MetaData(),
                Column("y", Integer, Identity(), **kwargs),
            )

        assert_raises_message(ArgumentError, text, fn, server_default="42")
        assert_raises_message(ArgumentError, text, fn, server_onupdate="42")

    def test_to_metadata(self):
        identity1 = Identity(always=False, cycle=True, start=123)
        m = MetaData()
        t = Table(
            "t", m, Column("x", Integer), Column("y", Integer, identity1)
        )
        is_(identity1.column, t.c.y)
        # is_(t.c.y.server_onupdate, identity1)
        is_(t.c.y.server_default, identity1)

        m2 = MetaData()
        t2 = t.to_metadata(m2)
        identity2 = t2.c.y.server_default

        is_not_(identity1, identity2)

        is_(identity1.column, t.c.y)
        # is_(t.c.y.server_onupdate, identity1)
        is_(t.c.y.server_default, identity1)

        is_(identity2.column, t2.c.y)
        # is_(t2.c.y.server_onupdate, identity2)
        is_(t2.c.y.server_default, identity2)

    def test_autoincrement_column(self):
        t = Table(
            "t",
            MetaData(),
            Column("y", Integer, Identity(), primary_key=True),
        )
        assert t._autoincrement_column is t.c.y
        t2 = Table("t2", MetaData(), Column("y", Integer, Identity()))
        assert t2._autoincrement_column is None

    def test_identity_and_sequence(self):
        def go():
            return Table(
                "foo_table",
                MetaData(),
                Column("foo", Integer(), Identity(), Sequence("foo_seq")),
            )

        assert_raises_message(
            ArgumentError,
            "An column cannot specify both Identity and Sequence.",
            go,
        )

    def test_identity_autoincrement_false(self):
        def go():
            return Table(
                "foo_table",
                MetaData(),
                Column("foo", Integer(), Identity(), autoincrement=False),
            )

        assert_raises_message(
            ArgumentError,
            "A column with an Identity object cannot specify "
            "autoincrement=False",
            go,
        )
